package learnIgnite.compute;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;

import learnIgnite.MyLifecycleBean;

/**
 * ignite 分布式计算
 * @author yangcheng  
 * @date 2020年4月9日  
 * @version V1.0
 */
public class ComputeTest {
	public static void main(String[] args) {
		IgniteConfiguration cfg = new IgniteConfiguration();
		//将事件监听注入到Ignite中
		cfg.setLifecycleBeans(new MyLifecycleBean());
		//以服务端模式启动
		cfg.setClientMode(false);
		Ignite ignite = Ignition.start(cfg);
		
		
		
		
		/**
		 * ignite的分布式计算
		 * 数据网格：Ignite内存数据网格是一个内存内的键值存储
		 * 
		 * 
		 * 
		 */
		
		//同步广播
		IgniteCompute compute = ignite.compute();
		compute.broadcast(new IgniteRunnable() {//源码可知：IgniteRunnable extends Runnable, Serializable
			
			@Override
			public void run() {
				// TODO Auto-generated method stub
				System.out.println("Hello Node: " + ignite.cluster().localNode().id());
			}
		});
		
		//异步广播---此方法暂时证明不可执行
//		IgniteCompute compute2 = ignite.compute().withAsync();
//		compute2.broadcast(new IgniteRunnable() {//源码可知：IgniteRunnable extends Runnable, Serializable
//		
//			@Override
//			public void run() {
//				System.out.println("Hello Node: " + ignite.cluster().localNode().id());
//			}
//		});
//		
//		ComputeTaskFuture<?> fut = compute2.future();
//		fut.listen(new IgniteInClosure<? super ComputeTaskFuture<?>>() {
//		    public void apply(ComputeTaskFuture<?> fut) {
//		        System.out.println("Finished sending broadcast job to cluster.");
//		    }
//		});
		//--------------------------------------------------------------------------------------------
		/**
		 * call 和 run 类型任务（单个任务或者任务集合）执行
		 * 
		 * IgniteCallable<V>接口继承了Callable<V>, Serializable接口
		 */
		
		Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
		// Iterate through all words in the sentence and create callable jobs.
		for (final String word : "Count characters using callable".split(" ")) {
		    calls.add(new IgniteCallable<Integer>() {
		        @Override public Integer call() throws Exception {
		            return word.length(); // Return word length.
		        }
		    });
		}
		// Execute collection of callables on the cluster.
		Collection<Integer> res = ignite.compute().call(calls);
		int total = 0;
		// Total number of characters.
		// Looks much better in Java 8.
		for (Integer i : res){
			total += i;
		}
		System.out.println("分布式计算字符串字符长度总和为="+total);
		
		//--------------------------------------------------------------------------------------------
		
		/**
		 * apply()执行闭包(有异步执行方法)
		 * 闭包是一个代码块，它是把代码体和任何外部变量包装起来然后以一个函数对象的形式在内部使用它们，
		 * 然后可以在任何传入一个变量的地方传递这样一个函数对象，然后执行。
		 */
		
		
		IgniteCompute cmpute2 = ignite.compute();
		
		//IgniteClosure的两个参数分别为闭包的参数类型和闭包的返回类型
		Collection<Integer> rets = cmpute2.apply(new IgniteClosure<String, Integer>() {

			@Override
			public Integer apply(String e) {
				// 返回当前字符串的长度
				return e.length();
			}
		}, Arrays.asList("Count characters using closure".split(" ")));
		//遍历并整合分布式计算结果
		int sum = 0;
		for (int len : res)
		    sum += len;
		
		
		
		localState(ignite);
		
		
	}

	
	/**
	 * 节点内状态共享--ignite每个节点都有一个“共享并发node-local-map”
	 * @param ignite
	 */
	private static void localState(Ignite ignite){
		IgniteCallable<Long> job = new IgniteCallable<Long>() {
		/**
		 * 将上下文中的Ignite实例注入到当前变量中
		 */
		  @IgniteInstanceResource
		  private Ignite ignite2;

		  @Override
		  public Long call() {
		    // 从集群中获取nodeLocalMap(为jdk中的ConcurrentMap实例)
		    ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite2.cluster().nodeLocalMap();
		    //获取计数器
		    AtomicLong cntr = nodeLocalMap.get("counter");

		    if (cntr == null) {
		      //当前新建的计数器放入localMap中同时返回原localMap
		      AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());

		      if (old != null)
		        cntr = old;
		    }

		    return cntr.incrementAndGet();
		  }
		};
		ClusterGroup random = ignite.cluster().forRandom();
		IgniteCompute compute = ignite.compute(random);
		// The first time the counter on the picked node will be initialized to 1.
		Long res = compute.call(job);
		System.out.println( res == 1);
		// Now the counter will be incremented and will have value 2.
		res = compute.call(job);
		System.out.println( res == 2);
		
	}
}
